09. ForkJoin Pools

ForkJoin Pools

In this section, you'll learn some of the advantages of ForkJoinPools, and how to use them.

ND079 JPND C2 L05 A09 ForkJoin Pools

What is a ForkJoinPool?

ForkJoinPool is a specialized kind of thread pool that has the following advantages over traditional thread pools:

  • It uses a technique called work stealing so that idle worker threads can find work to do.
  • Its API is optimized for asynchronous work that creates more work. You might also hear this called recursive work.

In practice, work stealing does not have a huge impact on performance because "traditional" thread pools do a fine job of distributing work across the worker threads. However, depending on the kind of asynchronous tasks your program creates, work stealing may give an extra efficiency boost.

ForkJoinTasks

When you create work to submit to a ForkJoinPool, you usually do so by subclassing either RecursiveTask or RecursiveAction.

Use RecursiveTask for asynchronous work that returns a value, and use RecursiveAction when the asynchronous computation does not return a value.

The ForkJoinPool API is optimized for recursive work, which is work that creates other work.

When you are implementing the compute() method of a RecursiveAction or RecursiveTask, you can submit more work to the thread pool by calling the invoke()) method, or one of its many variants. Once you invoke the recursive work, your RecursiveAction or RecursiveTask can wait for the results and use them in its own computation, or it can proceed without joining the results.

You can also use the "normal" thread pool methods of submit() and execute().

ForkJoinPool Demo

ND079 JPND C2 L05 A10 Demo ForkJoin Pools

In this demo, we took a recursive, sequential algorithm, and parallelized the work using a ForkJoinPool.

Code from the Demo

CountWordsTask.java

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public final class CountWordsTask extends RecursiveTask<Long> {
  private final Path path;
  private final String word;

  public CountWordsTask(Path path, String word) {
    this.path = path;
    this.word = word;
  }

  @Override
  protected Long compute() {
    if (!Files.isDirectory(path)) {
      return WordCounter.countWordInFile(path, word);
    }
    Stream<Path> subpaths;
    try {
      subpaths = Files.list(path);
    } catch (IOException e) {
      return 0L;
    }
    List<CountWordsTask> subtasks =
        subpaths.map(path -> new CountWordsTask(path, word))
        .collect(Collectors.toList());
    invokeAll(subtasks);
    return subtasks
        .stream()
        .mapToLong(CountWordsTask::getRawResult)
        .sum();
  }
}

WordCounter.java

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;

public final class WordCounter {
  public static void main(String[] args) {
    if (args.length != 2) {
      System.out.println("Usage: WordCounter [path] [word]");
      return;
    }
    Path start = Path.of(args[0]);
    String word = args[1];

    Instant before = Instant.now();

    ForkJoinPool pool = new ForkJoinPool();
    long count = pool.invoke(new CountWordsTask(start, word));

    Duration elapsed = Duration.between(before, Instant.now());
    System.out.println(count + " (" + elapsed.toSeconds() + " seconds)");
  }

  public static long countWordInFile(Path file, String word) {
    try {
      return Files.readAllLines(file, StandardCharsets.UTF_8)
          .stream()
          .flatMap(l -> Arrays.stream(l.split(" ")))
          .filter(word::equalsIgnoreCase)
          .count();
    } catch (IOException e) {
      return 0;
    }
  }

  private static long countWords(Path path, String word) {
    if (!Files.isDirectory(path)) {
      return countWordInFile(path, word);
    }
    try {
      return Files.list(path)
          .mapToLong(p -> countWords(p, word))
          .sum();
    } catch (IOException e) {
      return 0;
    }
  }
}

What are some good reasons to use ForkJoinPool instead of a "traditional" ExecutorService thread pool?

SOLUTION:
  • Work stealing can reduce the number of idle worker threads.
  • The ForkJoin API is optimized for recursive work tasks.